Security News
GitHub Removes Malicious Pull Requests Targeting Open Source Repositories
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
steampipes
Advanced tools
Table of Contents generated with DocToc
Fast, simple data pipelines built from first principles. Basically, datomic transducers.
SteamPipes is the successor to PipeStreams and PipeDreams. PipeStreams was originally built on top of NodeJS streams and through; from version X███████████████ on, I switched to pull-streams.
Valid SteamPipes sources include all JS values for which either
CS │ JS
──────────────────────────────┼─────────────────────────────────────
for d from source │ for ( d of source ) {
... │ ... }
──────────────────────────────┴─────────────────────────────────────
or
CS │ JS
──────────────────────────────┼─────────────────────────────────────
for await d from source │ for await ( d of source ) {
... │ ... }
──────────────────────────────┴─────────────────────────────────────
is valid.
In addition, synchronous and asynchronous functions that, when called without arguments, return a value for
which one of the iteration modes (sync or async) works correctly are allowed. Such a function will be called
as late as possible, that is, not at pipeline definition time, but only when a pipeline with a source and a
drain has been constructed and is started with pull()
.
Functions that take 2 arguments d
and send
(includes send.end()
);
must/should/may have a list (Array
) that acts as so-called 'local sink' (this is where data send with
send d
is stored before being passed to the next transform);
property to indicate whether transform is asynchronous.
transforms have a property sink
, which must be a list (at least have a shift()
method);
TF may add (ordinarily push()
) values to the sink at any time (but processing only guaranteed when this
happens, in TFs marked synchronous, before the main body of the function completed, and in TFs marked
asynchronous, before done()
has been called).
conceivable to use same TF, same sink
in two or more pipelines simultaneously; conceivable to accept
values from other sources than the TF which is directly upstream; hence possible to construct wyes (i.e.
data sources that appear in mid-stream).
Calling $ whatever..., ( d, send ) -> ...
is always equivalent to calling modify whatever..., $ ( d, send ) -> ...
; calling modify t
without any further arguments is equivalent to t
(the transform
itself).
@chunkify_*()
—cut stream by observing boundaries. Depending on whether to keep or toss datoms
recognbized by the filter
function as boundaries, use either
@$chunkify_keep = ( filter, postprocess = null ) ->
or@$chunkify_toss = ( filter, postprocess = null ) ->
The second, optional postprocess
argument must be a function when given; it will receive a list of
datoms and may return any value which will then be sent on. Sample application:filter = ( d ) -> d in [ '(', ')', ]
postprocess = ( chunk ) -> chunk.join '|'
pipeline = []
pipeline.push 'ab(cdefg)'
pipeline.push SP.$chunkify_keep filter, postprocess
pipeline.push SP.$show()
pipeline.push SP.$drain -> resolve()
SP.pull pipeline...
will print
'a|b|('
'c|d|e|f|g|)'
Had we used SP.$chunkify_toss filter, postprocess
instead, the output would have been
'a|b'
'c|d|e|f|g'
So if one just wanted to collect all stream items into a single list, one would use either SP.$collect()
or else an argument to the drain transform, as in SP.$drain ( collector ) -> resolve collector
; if one
wanted to collect all stream items into multiple lists, then SP.$chunkify_{keep|toss} filter, ...
is the
way to go.
$before_first()
, $after_last()
{ first, last, before, after, between, }
$before_first()
$after_last()
$async_before_first()
$async_after_last()
Arbitrary objects can act as sinks provided they have a sink
property; this property must be either set to
true
for a generic sink or else be an object that has push()
method (such as a list). A sink may,
furthermore, also have an on_end()
method which, if set, must be a function that takes zero or one
argument.
If the sink
property is a list, then it will receive all data items that arrive through the pipeline (the
resultant data of the pipeline); if it is true
, then those data items will be discarded.
The on_end()
method will be called when streaming has terminated (since the source was exhausted or a
transform called send.end()
); if it takes one argument, then that will be the list of resultant data. If
both the sink
property has been set to a list and on_end()
takes an argument, then that value will be
the sink
property (you probably only want the one or the other in most cases).
{ sink: true, }
{ sink: true, on_end: ( -> do_something() ), }
{ sink: true, on_end: ( ( result ) -> do_something result ), }
{ sink: x, on_end: ( ( result ) -> do_something result ### NB result is x ### ), }
The only SteamPipes method that produces a sink is $drain()
(it should really be called sink()
but for
compatibility with PipeStreams the name has been kept as a holdover from pull-stream
). $drain()
takes
zero, one or two arguments:
$drain() is equiv. to { sink: true, }
$drain -> ... is equiv. to { sink: true, on_end: ( -> ... ), }
$drain ( x ) -> ... is equiv. to { sink: true, on_end: ( ( x ) -> ... ), }
$drain { sink: x, }, -> ... is equiv. to { sink: x, on_end: ( -> ... ), }
$drain { sink: x, }, ( x ) -> ... is equiv. to { sink: x, on_end: ( ( x ) -> ... ), }
Asynchronous transforms can be constructed using the 'asynchronous remit' method, $async()
. The method
passed into $async()
must accept three arguments, namely d
(the data item coming down the pipeline),
send
(the method to send data down the pipeline), and, in addition to synchronous transforms, done
,
which is a callback function used to signal completion (it is analogous to the resulve
argument of
promises, new Promise ( resulve, reject ) ->
and indeed implemented as such). An example:
X███████████████
X███████████████
X███████████████
X███████████████
I. Special Arities
There are two special duct arities, empty and single. An empty pipeline producers a duct marked with
is_empty: true
; it is always a no-op, hence discardable. The duct does not have a type
property.
A pipeline with a single element produces a duct with the property is_single: true
; it is always
equivalent to its sole transform, and its type
property is that of its sole element.
SHAPE OF PIPELINE SHAPE OF DUCT REMARKS
⋆ [] ⇨ { is_empty: true, } # equiv. to a no-op
⋆ [ x, ] ⇨ { is_single: true, } # equiv. to its single member
II. Open Ducts
Open ducts may always take the place of a non-composite element of the same type; this is what makes
pipelines composable. As one can always replace a sequence like ( x += a ); ( x += b );
by a
non-composed equivalent ( x += a + b )
, so can one replace a non-composite through (i.e. a single
function that transforms values) with a composite one (i.e. a list of throughs), and so on:
SHAPE OF PIPELINE SHAPE OF DUCT REMARKS
⋆ [ source, transforms..., ] ⇨ { type: 'source', } # equiv. to a non-composite source
⋆ [ transforms..., ] ⇨ { type: 'through', } # equiv. to a non-composite transform
⋆ [ transforms..., sink, ] ⇨ { type: 'sink', } # equiv. to a non-composite sink
III. Closed Ducts
Closed ducts are pipelines that have both a source and a sink (plus any number of throughs). They are like a
closed electric circuit and will start running when being passed to the pull()
method (but note that
actual data flow may be indefinitely postponed in case the source does not start delivering immediately).
SHAPE OF PIPELINE SHAPE OF DUCT REMARKS
⋆ [ source, transforms..., sink, ] ⇨ { type: 'circuit', } # ready to run
Two ways to end a stream from inside a transform: either
send.end()
, orsend SP.symbols.end
.The two methods are 100% identical. In SteamPipes, 'ending a stream' means 'to break from the loop that iterates over the data source'.
Note that when the pull
method receives an end
signal, it will not request any further data from the
source, but it will allow all data that is already in the pipeline to reach the sink just as in regular
operation, and it will also supply all transforms that have requested a last
value with such a terminal
value.
Any of these actions may cause any of the transforms to issue an unlimited number of further values, so
that, in the general case, end
ing a stream is not guaranteed to actually stop processing at any point in
time; this is only true for properly coöperating transforms.
There's no API to abort a stream—i.e. make the stream and all transforms stop processing immediately—but one
can always wrap the pull pipeline...
invocation into a try
/catch
clause and throw a custom symbolic
value:
pipeline = []
...
pipeline.push $ ( d, send ) ->
...
throw 'abort'
...
...
try
pull pipeline...
catch error
throw error if error isnt 'abort'
warn "the stream was aborted"
...
start()
, it will be called when SP.pull pipeline...
is called; this enables
push sources to delay issuing data until the pipeline is ready to consume it### TAINT how can
undefinedend up in
transforms??? ###
in pull-remit.coffee
: Fix bugsource = SP.new_push_source()
source.send 1
source.send 2
pipeline = []
pipeline.push source
pipeline.push SP.$show()
pipeline.push $drain ->
urge '^2262^', "demo_stream ended"
resolve()
source.end() # (1)
SP.pull pipeline...
# source.end() # (2)
With (1)
, the drain condition never triggers; only (2)
works as intended; i.o.w. source.end()
must
not be called before SP.pull()
. This is not acceptable.$drain()
should allow to appear mid-stream (it would then pull data from upstream,
downstream must rely on own $drain()
to obtain data).done()
method
to signal finishing, synchronous transforms don't have that, so it is not clear how to deal with a
situation where a transform happens to decide it doesn't want to send()
anything (although, the
transform does return (stop running), so that might be a way)yield
instead of send()
is not a good idea$split()
work with both streams and bufferssend.skip n
(or send.drop n
) to drop next n datoms$split_tsv()
$once_before_first()
$once_after_last()
$once_async_before_first()
$once_async_after_last()
$once_with_first()
$once_async_with_first()
$once_with_last()
$once_async_with_last()
$once_with_nth()
$once_async_with_nth()
using modifiers instead ($ { once: [ 'first', 'last', ], }, transform (d, send ) ->
) to avoid API bloat{ first, last, }
modifiers is used and uses the $
/remit
method of another instance of the SteamPipes library, then that transform will get to see the first
value, but not the last
; presumably, this is caused by the buckets not being shared between the
pipeline at large and the transform?( require 'pathA/steampipes' ).new_push_source()
is used in a pipeline that
is activated by) ( require 'pathB/steampipes' ).pull pipeline...
) and error with message
^steampipes/pullremit@7003^ expected an iterable, a function, a generator function or a sink, got a object
results. The message should at least hint point at the probable error cause or be avoided at all.
FIXED in v6.2: replaced Symbol 'xy'
with Symbol.from 'steampipes/xy'
in SP.marks
.$drain()
, attached to push_source.start
and possibly other places
are not called or not called with await
, thus causing silent failures. Must always reject loudly where
detected or be handled appropriately.$chunkify()
(as configurable variant of $collect()
?); see code comment in standard
transforms$split()
with one based on intertext-splitlines
$batch size
see Breaking Chains with Pipelines in Modern JavaScript
const result3 = numbers
|> filter(#, v => v % 2 === 0)
|> map(#, v => v + 1)
|> slice(#, 0, 3)
|> Array.from;
Transform categorization: functions may
acc. to result arity
$map()
)$watch()
)$
/remit()
)$watch()
)acc. to iterability
acc. to synchronicity
acc. to happiness
throw
for sad results[].indexOf()
)pipeline definition may take on this form:
¶ = ( pipeline = [] ).push.bind pipeline
¶ tee other_pipeline, ( d ) -> 110 <= d <= 119 # optional filter, all `d`s stay in this pipeline, some also in other
¶ switch other_pipeline, ( d ) -> 110 <= d <= 119 # obligatory filter, each `d` in only one pipeline
¶ watch ( d ) -> ... # return value thrown away (does that respect async functions?)
¶ guard -1, $indexOf 'helo' # guard with filter value, saddens value when `true <- CND.equals(...)`
¶ guard ( ( d ) -> ... ), indexOf 'helo' # guard with filter function, saddens value when `true <- filter()`
¶ trycatch map ( d ) -> throw new Error "whaat" if d % 2 is 0; return d * 3 + 1
¶ trycatch $ ( d, send ) -> throw new Error "whaat" if d % 2 is 0; send d; send d * 3 + 1
¶ if_sad $show_warning()
¶ if_sad $ignore()
¶ drain()
pull ¶
pipe processing never calls any transform with sad value (except for those explicitly configured to accept those)
but all sad values are still passed on, cause errors at pipeline end (near drain) when not being filtered out
must not swallow exceptions implicitly as that would promote silent failures
benefit: simplify logic a great deal
benefit: may record errors and try to move on, then complain with summary of everything that went wrong
FAQs
Fast, simple data pipelines
The npm package steampipes receives a total of 0 weekly downloads. As such, steampipes popularity was classified as not popular.
We found that steampipes demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.